Welcome to aiokafka’s documentation! 您所在的位置:网站首页 kafka consumer python Welcome to aiokafka’s documentation!

Welcome to aiokafka’s documentation!

2024-02-02 14:16| 来源: 网络整理| 查看: 265

Welcome to aiokafka’s documentation!¶ https://img.shields.io/badge/kafka-1.0%2C%200.11%2C%200.10%2C%200.9-brightgreen.svg https://img.shields.io/pypi/pyversions/aiokafka.svg https://img.shields.io/badge/license-Apache%202-blue.svg

aiokafka is a client for the Apache Kafka distributed stream processing system using asyncio. It is based on the kafka-python library and reuses its internals for protocol parsing, errors, etc. The client is designed to function much like the official Java client, with a sprinkling of Pythonic interfaces.

aiokafka can be used with 0.9+ Kafka brokers and supports fully coordinated consumer groups – i.e., dynamic partition assignment to multiple consumers in the same group.

Getting started¶ AIOKafkaConsumer¶

AIOKafkaConsumer is a high-level message consumer, intended to operate as similarly as possible to the official Java client.

Here’s a consumer example:

from aiokafka import AIOKafkaConsumer import asyncio async def consume(): consumer = AIOKafkaConsumer( 'my_topic', 'my_other_topic', bootstrap_servers='localhost:9092', group_id="my-group") # Get cluster layout and join group `my-group` await consumer.start() try: # Consume messages async for msg in consumer: print("consumed: ", msg.topic, msg.partition, msg.offset, msg.key, msg.value, msg.timestamp) finally: # Will leave consumer group; perform autocommit if enabled. await consumer.stop() asyncio.run(consume())

Read more in Consumer client section.


AIOKafkaProducer is a high-level, asynchronous message producer.

Here’s a producer example:

from aiokafka import AIOKafkaProducer import asyncio async def send_one(): producer = AIOKafkaProducer( bootstrap_servers='localhost:9092') # Get cluster layout and initial topic/partition leadership information await producer.start() try: # Produce message await producer.send_and_wait("my_topic", b"Super message") finally: # Wait for all pending messages to be delivered or expire. await producer.stop() asyncio.run(send_one())

Read more in Producer client section.

Installation¶ pip install aiokafka


aiokafka requires the kafka-python library.

Optional LZ4 install¶

To enable LZ4 compression/decompression, install aiokafka with lz4 extra option:

pip install ‘aiokafka[lz4]’

Note, that on Windows you will need Visual Studio build tools, available for download from http://landinghub.visualstudio.com/visual-cpp-build-tools

Optional Snappy install¶

Download and build Snappy from http://google.github.io/snappy/


apt-get install libsnappy-dev


brew install snappy

From Source:

wget https://github.com/google/snappy/tarball/master tar xzvf google-snappy-X.X.X-X-XXXXXXXX.tar.gz cd google-snappy-X.X.X-X-XXXXXXXX ./configure make sudo make install

Install aiokafka with snappy extra option

pip install 'aiokafka[snappy]' Optional zstd indtall¶

To enable Zstandard compression/decompression, install aiokafka with zstd extra option:

pip install 'aiokafka[zstd]' Optional GSSAPI install¶

To enable SASL authentication with GSSAPI, install aiokafka with gssapi extra option:

pip install 'aiokafka[gssapi]' Source code¶

The project is hosted on GitHub

Please feel free to file an issue on bug tracker if you have found a bug or have some suggestion for library improvement.

The library uses Travis for Continious Integration.

Authors and License¶

The aiokafka package is Apache 2 licensed and freely available.

Feel free to improve this package and send a pull request to GitHub.


Producer client Message buffering Retries and Message acknowledgement Idempotent produce Transactional producer Returned RecordMetadata object Direct batch control Consumer client Offsets and Consumer Position Manual vs automatic committing Controlling The Consumer’s Position Storing Offsets Outside Kafka Consumer Groups and Topic Subscriptions Topic subscription by pattern Manual partition assignment Consumption Flow Control Reading Transactional Messages Detecting Consumer Failures Difference between aiokafka and kafka-python Why do we need another library? API differences and rationale Consumer has no poll() method Rebalances are happening in the background Prefetching is more sophisticated API Documentation Producer class Consumer class Helpers Abstracts SSL Authentication SASL Authentication Error handling Consumer errors Other references Errors Structs Protocols Examples Serialization and compression Manual commit Group consumer Custom partitioner SSL usage Local state consumer Batch producer Transactional Consume-Process-Produce






      CopyRight 2018-2019 实验室设备网 版权所有